feat(ISV-7020): add chunking capabilities and cluster-error handling#17
feat(ISV-7020): add chunking capabilities and cluster-error handling#17BorekZnovustvoritel merged 5 commits intomainfrom
Conversation
4ca7357 to
b245377
Compare
b245377 to
d51e1fd
Compare
* chunking enable only in retry topics * tests assisted with Claude Signed-off-by: Marek Szymutko <mszymutk@redhat.com> Assisted-by: Claude-4.6-Opus
d51e1fd to
32ffcfe
Compare
| ], | ||
| group_id="test-chunks", | ||
| timeout=30.0, | ||
| split_messages=True, |
There was a problem hiding this comment.
You can try locally that unsetting this will break the test because the message cannot fit in the topic. Which means:
- Chunking fixes the issue
- Chunking is off by default
Also check in integration logs for debug messages about reassembled MessageGroups.
|
@RichardPlesnik please take a look. This will be used for retry topics only, but requires no further configuration in any cluster or consumer services. Producer services not consumed by this library will still need to be updated with additional configuration. |
|
Whoops, that review wasn't finished, stand by... |
bclindner
left a comment
There was a problem hiding this comment.
Implementation looks generally sound, but I'm worried about an edge-case here. Consider a situation where a producer sends all but one chunk of a message before its process is killed, for whatever reason, and the broker and consumer remain online.
- Tiny problem I see here is ChunkingCache lacks any sort of expiry or method of discarding those chunks, or even a way to know when to discard them, right? I think they'd just be stuck in memory until the consumer process is restarted.
- Way bigger problem is the commit offset - if I'm reading this right, if we lose any chunk of a message, wouldn't that make us unable to commit higher offsets, basically stalling the partition out?
If we have a way to address this maybe we could get some tests & ITs in to prove the edge case has a clean resolution in CI.
|
Thanks for raising the concern. You are right that there is currently no invalidation of the chunking cache other than successful read. However the behavior is not quite as you stated. The messageGroup is submitted to the tracking manager only when the whole group is successfully reassembled. This means any incomplete messageGroup won't block anything as the trackingManager doesn't have information about it. This logic won't break as long as the producer sends the messageGroup to the cluster in a sequence without any interruptions. The only scenario I can see this breaking is if the producer produces the start of the messageGroup, then some standalone message and then the rest of the group. If the consumer crashes right after the consumption of the standalone message and commits it before consuming the rest of the group, the first part of the group is lost. This could be fixed by merging chunkingCache and trackingManager to keep track of incomplete messages too. There could also be some other incomplete messageGroups, either when the producer crashes during sending the group or if the data retention policy in the cluster destroys the first message(s) of the group. Currently, the only way of clearing them would be restarting the consumer application. I agree that some invalidation (with logged warnings) can be added to make this more robust. I will take another look at this, hopefully I will be able to address all the edgecases we currently see. |
* enable chunk expiration * tests assisted with Claude Signed-off-by: Marek Szymutko <mszymutk@redhat.com> Assisted-by: Claude-4.6-Opus
There was a problem hiding this comment.
I think your explanation + changes adequately address things, thanks and sorry for the extra work! Will need to watch out for those warnings and make sure dropped messages can be addressed anywhere this chunking has to happen (though I assume in any decent EDA you'd have contingencies for that kind of thing).
|
Thanks for the great work! LGTM 👍 |
This PR introduces a custom message chunking protocol. This protocol is enabled by default in retry topics only. The work required creating a new data object,
MessageGroup, which holds multiple messages, the value of these messages need to be concatenated to get the transferred data.This new feature is unit and integration tested. Consumers can now consume both normal and chunked messages seamlessly, even from the same partition, the changes were made to be backwards-compatible.
This feature cannot be turned on for producers consumed by different libraries as their clients cannot read such messages, the implemented solution is custom.